{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d2a8e928",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | default_exp _components.helpers"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d3d7d267",
   "metadata": {},
   "source": [
    "# Internal helpers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b84d16ae",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def in_notebook() -> bool:\n",
    "    \"\"\"\n",
    "    Checks if the code is running in a Jupyter notebook or not.\n",
    "\n",
    "    Returns:\n",
    "        True if running in a Jupyter notebook, False otherwise.\n",
    "    \"\"\"\n",
    "    try:\n",
    "        from IPython import get_ipython\n",
    "\n",
    "        if \"IPKernelApp\" not in get_ipython().config:\n",
    "            return False\n",
    "    except ImportError:\n",
    "        return False\n",
    "    except AttributeError:\n",
    "        return False\n",
    "    return True"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9930711d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "in_notebook()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5d91bc9a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "import contextlib\n",
    "import importlib\n",
    "import os\n",
    "import sys\n",
    "from datetime import datetime, timedelta\n",
    "from inspect import Parameter\n",
    "from typing import *\n",
    "\n",
    "import typer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "69fb3adc",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from pathlib import Path\n",
    "from tempfile import TemporaryDirectory\n",
    "\n",
    "from fastkafka._aiokafka_imports import AIOKafkaConsumer, AIOKafkaProducer\n",
    "from nbdev_mkdocs.docstring import run_examples_from_docstring\n",
    "\n",
    "from fastkafka._application.app import FastKafka\n",
    "from fastkafka._components.logger import suppress_timestamps\n",
    "from fastkafka._components.test_dependencies import generate_app_src"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "254e1819",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@contextlib.contextmanager\n",
    "def change_dir(d: str) -> Generator[None, None, None]:\n",
    "    \"\"\"\n",
    "    Changes the current working directory temporarily.\n",
    "\n",
    "    Args:\n",
    "        d: The directory to change to.\n",
    "\n",
    "    Yields:\n",
    "        None.\n",
    "    \"\"\"\n",
    "    curdir = os.getcwd()\n",
    "    os.chdir(d)\n",
    "    try:\n",
    "        yield\n",
    "    finally:\n",
    "        os.chdir(curdir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ac1eac6b",
   "metadata": {},
   "outputs": [],
   "source": [
    "with TemporaryDirectory() as d:\n",
    "    original_wd = os.getcwd()\n",
    "    assert original_wd != d\n",
    "    with change_dir(d):\n",
    "        assert os.getcwd() == d\n",
    "    assert os.getcwd() == original_wd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "22ff9e03",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "class ImportFromStringError(Exception):\n",
    "    pass\n",
    "\n",
    "\n",
    "def _import_from_string(import_str: str) -> Any:\n",
    "    \"\"\"Imports library from string\n",
    "\n",
    "    Note:\n",
    "        copied from https://github.com/encode/uvicorn/blob/master/uvicorn/importer.py\n",
    "\n",
    "    Args:\n",
    "        import_str: input string in form 'main:app'\n",
    "\n",
    "    \"\"\"\n",
    "    sys.path.append(\".\")\n",
    "\n",
    "    if not isinstance(import_str, str):\n",
    "        return import_str\n",
    "\n",
    "    module_str, _, attrs_str = import_str.partition(\":\")\n",
    "    if not module_str or not attrs_str:\n",
    "        message = (\n",
    "            'Import string \"{import_str}\" must be in format \"<module>:<attribute>\".'\n",
    "        )\n",
    "        typer.secho(f\"{message}\", err=True, fg=typer.colors.RED)\n",
    "        raise ImportFromStringError(message.format(import_str=import_str))\n",
    "\n",
    "    try:\n",
    "        # nosemgrep: python.lang.security.audit.non-literal-import.non-literal-import\n",
    "        module = importlib.import_module(module_str)\n",
    "    except ImportError as exc:\n",
    "        if exc.name != module_str:\n",
    "            raise exc from None\n",
    "        message = 'Could not import module \"{module_str}\".'\n",
    "        raise ImportFromStringError(message.format(module_str=module_str))\n",
    "\n",
    "    instance = module\n",
    "    try:\n",
    "        for attr_str in attrs_str.split(\".\"):\n",
    "            instance = getattr(instance, attr_str)\n",
    "    except AttributeError:\n",
    "        message = 'Attribute \"{attrs_str}\" not found in module \"{module_str}\".'\n",
    "        raise ImportFromStringError(\n",
    "            message.format(attrs_str=attrs_str, module_str=module_str)\n",
    "        )\n",
    "\n",
    "    return instance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "85e14aa3",
   "metadata": {},
   "outputs": [],
   "source": [
    "with TemporaryDirectory() as d:\n",
    "    src_path = Path(d) / \"main.py\"\n",
    "    generate_app_src(src_path)\n",
    "    with change_dir(d):\n",
    "        kafka_app = _import_from_string(f\"{src_path.stem}:kafka_app\")\n",
    "        assert isinstance(kafka_app, FastKafka)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "77ddf011",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def true_after(seconds: Union[int, float]) -> Callable[[], bool]:\n",
    "    \"\"\"Function returning True after a given number of seconds\"\"\"\n",
    "    t = datetime.now()\n",
    "\n",
    "    def _true_after(seconds: Union[int, float] = seconds, t: datetime = t) -> bool:\n",
    "        return (datetime.now() - t) > timedelta(seconds=seconds)\n",
    "\n",
    "    return _true_after"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "44a89246",
   "metadata": {},
   "outputs": [],
   "source": [
    "f = true_after(1.1)\n",
    "assert not f()\n",
    "time.sleep(1)\n",
    "assert not f()\n",
    "time.sleep(0.1)\n",
    "assert f()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "652fe0a0",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "def unwrap_list_type(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]:\n",
    "    \"\"\"\n",
    "    Unwraps the type of a list.\n",
    "\n",
    "    Vars:\n",
    "        var_type: Type to unwrap.\n",
    "\n",
    "    Returns:\n",
    "        Unwrapped type if the given type is a list, otherwise returns the same type.\n",
    "\n",
    "    Example:\n",
    "        - Input: List[str]\n",
    "          Output: str\n",
    "        - Input: int\n",
    "          Output: int\n",
    "    \"\"\"\n",
    "    if hasattr(var_type, \"__origin__\") and var_type.__origin__ == list:\n",
    "        return var_type.__args__[0]  # type: ignore\n",
    "    else:\n",
    "        return var_type"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cac74052",
   "metadata": {},
   "outputs": [],
   "source": [
    "assert unwrap_list_type(List[int]) == int\n",
    "assert unwrap_list_type(int) == int"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3d35daef",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def remove_suffix(topic: str) -> str:\n",
    "    \"\"\"\n",
    "    Removes the suffix from a string by splitting on underscores and joining all but the last element.\n",
    "\n",
    "    Args:\n",
    "        topic: The string to remove the suffix from.\n",
    "\n",
    "    Returns:\n",
    "        The string without the suffix.\n",
    "    \"\"\"\n",
    "    return \"_\".join(topic.split(\"_\")[:-1])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "736c2261",
   "metadata": {},
   "outputs": [],
   "source": [
    "assert remove_suffix(\"on_topic.hello_1\") == \"on_topic.hello\""
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
